package org.zjvis.datascience.common.etl;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zjvis.datascience.common.constant.SqlTemplate;
import org.zjvis.datascience.common.enums.ETLEnum;
import org.zjvis.datascience.common.enums.SubTypeEnum;
import org.zjvis.datascience.common.sql.SqlHelper;
import org.zjvis.datascience.common.util.ToolUtil;
import org.zjvis.datascience.common.vo.TaskVO;

import java.util.List;
import java.util.Set;

/**
 * @description ETL-Sample 数据采样类
 * @date 2021-12-27
 */
public class Sample extends BaseETL {

    private final static Logger logger = LoggerFactory.getLogger("Sample");
    //select * from sales_fact_sample_flat order by random() limit 10
    private static String FILTER_SQL = "SELECT * FROM %s ORDER BY random() LIMIT %s";

    //1=按比例；2=具体条数
    private int mod;

    private int ratio;

    private int count;

    public Sample() {
        super(ETLEnum.SAMPLE.name(), SubTypeEnum.ETL_OPERATE.getVal(), SubTypeEnum.ETL_OPERATE.getDesc());
        this.maxParentNumber = 1;
    }

    public void parserConf(JSONObject conf) {
        this.mod = conf.getIntValue("mod");
        this.ratio = conf.getInteger("ratio");
        this.count = conf.getInteger("count");
    }

    public String initSql(JSONObject conf, List<SqlHelper> sqlHelpers, long timeStamp, String engineName) {
        this.engineName = engineName;
        JSONArray input = conf.getJSONArray("input");
        if (input == null || input.size() == 0) {
            logger.error("the input which is going to be sampled is empty");
            return null;
        }
        String tableName = input.getJSONObject(0).getString("tableName");
        tableName = ToolUtil.alignTableName(tableName, timeStamp);
        logger.debug("sampling input table is {}", tableName);

        JSONArray output = conf.getJSONArray("output");
        if (output == null || output.size() == 0) {
            logger.error("sample configuration output is empty!!!");
            return null;
        }
        String outTable = output.getJSONObject(0).getString("tableName") + timeStamp;
        String selectSql = "";
        if (mod == 1) {
            double ratiof = ratio / 100.0;
            String sql = String.format("%s * (select count(*) as total from %s)", ratiof, tableName);
            selectSql = String.format(FILTER_SQL, tableName, sql);
        } else if (mod == 2) {
            selectSql = String.format(FILTER_SQL, tableName, conf.getInteger("count"));
        }
        logger.debug("ETL/sample is going to execute sql: " + selectSql);
        if (!selectSql.isEmpty()) {
            return String.format(SqlTemplate.CREATE_TABLE_SQL, outTable, selectSql);
        }
        return null;
    }

    public void defineOutput(TaskVO vo) {
        JSONObject jsonObject = vo.getData();
        JSONArray output = new JSONArray();
        String tableName = String.format(SqlTemplate.OUT_TABLE_NAME, "sample", vo.getPipelineId(), vo.getId());
        JSONArray input = jsonObject.getJSONArray("input");
        if (input == null || input.size() == 0) {
            return;
        }
        JSONArray cols = input.getJSONObject(0).getJSONArray("tableCols");
        JSONArray outputColTypes = input.getJSONObject(0).getJSONArray("columnTypes");

        JSONObject numberFormat = input.getJSONObject(0).getJSONObject("numberFormat");
        if (numberFormat != null && cols != null && !cols.isEmpty()) {
            Set<String> colNames = numberFormat.keySet();
            for (String col : colNames) {
                if (!cols.contains(col)) {
                    numberFormat.remove(col);
                }
            }
        }

        JSONObject item = new JSONObject();
        item.put("numberFormat", numberFormat);
        item.put("tableName", tableName);
        item.put("tableCols", cols);
        item.put("nodeName", vo.getName() == null ? ETLEnum.SAMPLE.toString() : vo.getName());
        item.put("columnTypes", outputColTypes);
        this.setSubTypeForOutput(item);
        output.add(item);
        jsonObject.put("output", output);
        vo.setData(jsonObject);
    }

    public void initTemplate(JSONObject data) {
        data.put("mod", 1);
        data.put("ratio", 1);
        data.put("count", 1);
        baseInitTemplate(data);
    }
}
